Skip to content

fix: argo workflows conditional nested static split joins#3002

Merged
saikonen merged 7 commits intomasterfrom
fix/argo-conditional-static-join
Mar 11, 2026
Merged

fix: argo workflows conditional nested static split joins#3002
saikonen merged 7 commits intomasterfrom
fix/argo-conditional-static-join

Conversation

@saikonen
Copy link
Copy Markdown
Collaborator

@saikonen saikonen commented Mar 11, 2026

Fixes an issue where a join-step nested inside a conditional branch did not have the correct dependencies. This would lead to the join step possibly executing prematurely, before all of its preceding steps had finished.

example flow:

from metaflow import FlowSpec, step 


class JoinBugFlow(FlowSpec):
    @step
    def start(self):
        self.route = "a"
        self.next(self.switch_step)

    @step
    def switch_step(self):
        self.next(
            {"a": self.branch_a, "b": self.branch_b},
            condition="route",
        )

    @step
    def branch_a(self):
        self.next(self.sub_a, self.sub_b)

    @step
    def sub_a(self):
        self.next(self.sub_join)

    @step
    def sub_b(self):
        raise Exception("The sub_join should never start!")
        self.next(self.sub_join)

    @step
    def sub_join(self, inputs):
        self.next(self.shared)

    @step
    def branch_b(self):
        self.next(self.shared)

    @step
    def shared(self):
        self.next(self.end)

    @step
    def end(self):
        pass


if __name__ == "__main__":
    JoinBugFlow()

@saikonen saikonen requested a review from mt-ob March 11, 2026 11:14
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Mar 11, 2026

Greptile Summary

This PR adds special-case dependency-string generation for Argo Workflows DAG tasks where a static-split join step is nested inside a conditional (split-switch) branch. Without the fix the generated depends expression could be incorrect, potentially allowing the join step to execute before all of its predecessor steps have completed.

Key changes and observations:

  • A new block is inserted after the existing _is_conditional_skip_node / _many_in_funcs_all_conditional overrides. It activates when node.type == "join" and at least one in_func is a conditional node.
  • Two inner helpers are introduced: _split_switch_ancestors (recursive backwards traversal to collect split-switch ancestors up to the enclosing static-split boundary) and build_ancestor_tree (groups in-funcs by their frozenset of conditional ancestors, then chains subsets greedily to produce a nested dependency expression).
  • The isinstance(g, list) guard in build_ancestor_tree is dead code — node_groups values are always flat lists of step-name strings, so the branch that handles a nested list can never be reached.
  • No unit tests are added for the new algorithm. The only existing Argo unit test file (test/unit/test_argo_workflows_cli.py) covers only sanitize_for_argo, leaving the new grouping/chaining logic untested at the unit level.
  • Several correctness edge-cases (empty split_parents, missing visited set in recursion, operator choices for the emitted expression) were discussed extensively in prior review rounds.

Confidence Score: 2/5

  • Not safe to merge — the new logic contains dead code, has no unit-test coverage, and several correctness concerns raised in prior rounds remain unresolved.
  • The algorithm is non-trivial and operates on a critical path (Argo depends expressions directly control task scheduling). The dead isinstance(g, list) branch is evidence of unvetted code. The recursive _split_switch_ancestors helper has no visited set (flagged previously) and both inner helpers are entirely untested. Multiple prior review comments about operator correctness and edge-case handling were acknowledged but not fully resolved in the diff.
  • metaflow/plugins/argo/argo_workflows.py — specifically the new block from line 1415 to 1497, _split_switch_ancestors, and build_ancestor_tree.

Important Files Changed

Filename Overview
metaflow/plugins/argo/argo_workflows.py Adds a new code block to handle join-step dependency generation when in_funcs contain conditional nodes nested inside a static split. Introduces _split_switch_ancestors (recursive, no visited set) and build_ancestor_tree helpers. The dead isinstance(g, list) check in build_ancestor_tree is unreachable. Multiple correctness edge-cases have been flagged in prior review rounds.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    start --> switch_step
    switch_step -->|route == a| branch_a
    switch_step -->|route == b| branch_b
    branch_a -->|static split| sub_a
    branch_a -->|static split| sub_b
    sub_a --> sub_join["sub_join (join)"]
    sub_b --> sub_join
    sub_join --> shared["shared (conditional join)"]
    branch_b --> shared
    shared --> end_step[end]

    subgraph "New depends generation for sub_join"
        NB1["1. Detect: node.type==join AND any in_func is conditional"]
        NB2["2. Build node_groups by split_branches[-1]\n{branch_a: [sub_a, sub_b]}"]
        NB3["3. _split_switch_ancestors(fn, split_parents[-1])\ncollects any split-switch nodes between fn and the static split"]
        NB4["4. build_ancestor_tree groups by frozenset of switch ancestors\nand chains subsets together"]
        NB5["5. Emit depends string from required_deps\ne.g. ((sub-a.Succeeded || sub-b.Succeeded))"]
        NB1 --> NB2 --> NB3 --> NB4 --> NB5
    end
Loading

Last reviewed commit: 8939b41

Comment thread metaflow/plugins/argo/argo_workflows.py
Comment thread metaflow/plugins/argo/argo_workflows.py Outdated
mt-ob
mt-ob previously approved these changes Mar 11, 2026
Comment thread metaflow/plugins/argo/argo_workflows.py Outdated
Comment thread metaflow/plugins/argo/argo_workflows.py Outdated
Comment thread metaflow/plugins/argo/argo_workflows.py Outdated
Comment thread metaflow/plugins/argo/argo_workflows.py
Comment thread metaflow/plugins/argo/argo_workflows.py
Comment thread metaflow/plugins/argo/argo_workflows.py Outdated
Comment thread metaflow/plugins/argo/argo_workflows.py
Comment thread metaflow/plugins/argo/argo_workflows.py
Comment thread metaflow/plugins/argo/argo_workflows.py
@saikonen saikonen merged commit 5a62706 into master Mar 11, 2026
33 checks passed
@saikonen saikonen deleted the fix/argo-conditional-static-join branch March 11, 2026 23:52
Comment on lines +1450 to +1454
nodes = [
n
for g in children
for n in (g if isinstance(g, list) else [g])
]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead isinstance(g, list) branch — always evaluates to [g]

children in node_groups is always a flat list of step-name strings. It is populated exclusively via new_funcs.append(fn) where fn is a string from node.in_funcs. Consequently isinstance(g, list) is always False, the g branch is dead code, and the comprehension is always equivalent to nodes = list(children).

Keeping the guard creates noise for future maintainers and may suggest that node_groups can hold nested lists when it cannot.

Suggested change
nodes = [
n
for g in children
for n in (g if isinstance(g, list) else [g])
]
nodes = list(children)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants